Add lambda support and array_transform udf#21679
Conversation
| /// A RecordBatch with the captured columns inside the lambda body, if any | ||
| /// | ||
| /// For example, for `array_transform([2], v -> v + a + b)`, | ||
| /// this will be a `RecordBatch` with columns `a` and `b` | ||
| captures: Option<RecordBatch>, |
There was a problem hiding this comment.
I dont think that we should have this for column capture and I think it would be a long review for the design on the column capture itself, so I'm fine for not including this preparation for column capture now and create a breaking change in a different pr (I'm also referring to the rest of the breaking changes made to support column capture like the evaluate added argument)
for example I think we should have ProjectionExpr, and use the existing logic for projecting, and not get all the arrays in the evaluate function
There was a problem hiding this comment.
I agree, reverted at 9cb4882
cc @LiaCastaneda
| pub fn evaluate( | ||
| &self, | ||
| args: &[&dyn Fn() -> Result<ArrayRef>], | ||
| _adjust: impl FnOnce(&[ArrayRef]) -> Result<Vec<ArrayRef>>, |
There was a problem hiding this comment.
I would revert this as well as explained before
| /// The return of [HigherOrderUDF::lambda_parameters] | ||
| pub enum LambdaParametersProgress { | ||
| Partial(Vec<Option<Vec<FieldRef>>>), | ||
| Complete(Vec<Vec<FieldRef>>), | ||
| } |
There was a problem hiding this comment.
I think this needs more comments explaining what each variant is, and when you should use each one (I know that you added some comments regarding this on lambda_parameters but some should also be here)
There was a problem hiding this comment.
Indeed, is much better now, thanks 15a0106
| Lambda(L), | ||
| } | ||
|
|
||
| /// The return of [HigherOrderUDF::lambda_parameters] |
There was a problem hiding this comment.
the comment should explain the enum and not the function that return it IMO
There was a problem hiding this comment.
I agree we should move the column capture support to a follow up PR, since it would involve more design discussions and this PR already proves that most lambda functions can be implemented easily -- I don't think we should block on it. Left some comments on the multi step type resolution you introduced in the last commits
| if step > 256 { | ||
| return plan_err!( | ||
| "{} lambda_parameters called 256 times without completion", | ||
| fm.name() | ||
| ); | ||
| } | ||
| }; |
There was a problem hiding this comment.
🤔 This limit feels arbitrary, even if I can't imagine a situation where a user has 256+ lambdas in a higher-order function each outputting a different type, in theory implementors express this. Since implementors know beforehand how their higher-order function typing works, it might make sense to include something like max_lambda_parameter_steps: usize in HigherOrderSignature, defaulting to 1?
There was a problem hiding this comment.
It misses a comment, indeed. It's mostly a guard against a faulty lambda_parameters that causes an infinite loop by endlessly returnin Partial's, but I agree that making it configurable it's better, but I stick to 256 as default so that ideally nobody has ever to think about it 9f19fed
| fn coerce_values_for_lambdas( | ||
| &self, | ||
| _fields: &[ValueOrLambda<DataType, DataType>], | ||
| ) -> Result<Vec<DataType>> { |
There was a problem hiding this comment.
coerce_values_for_lambdas and coerce_value_types do the same thing, but one is called after the lambdas are planned right? iiuc, you added a separate function because you need to see both lambdas and values via ValueOrLambda. If that's the only reason for having a separate function, maybe we should just have one function that receives &[ValueOrLambda<DataType, DataType>]? In the other PR I suggested having only &[DataType] because I couldn't think of a near term use case that would actually need both.
Not sure what the cleaner approach is here -- having two APIs that do the same thing, or having one API with &[ValueOrLambda<DataType, DataType>] as the parameter.
also I see this function is only called if coerce_values_for_lambdas flag is set to true in the signature, which can be easy to forget to the implementor.
There was a problem hiding this comment.
This is also missing an explanation. While unlikely, is still valid that coerce_value_types coercion changes the input values in a way that also changes lambda parameters: most cases will coerce from ListView(Int32) to List(Int32), for example (the lambda parameter is still Int32), but it also may coerce into a List(Int64) (the lambda parameter is Int64 now). So we need to call coerce_value_types before calling lambda_parameters, and then use it's output to call coerce_values_for_lambdas. 8f3b0fe
There was a problem hiding this comment.
In the other PR I suggested having only &[DataType] because I couldn't think of a near term use case that would actually need both.
I believe at that time it was ValueOrLambda<DataType, ()> only to detect bad arg positions but that function is likely not the place for it , so your suggestion still makes sense even taking into account the array_reduce signature.
also I see this function is only called if coerce_values_for_lambdas flag is set to true in the signature, which can be easy to forget to the implementor
Yeah, thats true. When adding it I thought about removing the flag, changing the return type to Result<Option<Vec<DataType>>> with a blank impl Ok(None), so that it would always be called but do nothing with the default impl, but I didn't like much the signature, WDYT?
There was a problem hiding this comment.
This is also missing an explanation. While unlikely, is still valid that coerce_value_types coercion changes the input values in a way that also changes lambda parameters: most cases will coerce from ListView(Int32) to List(Int32), for example (the lambda parameter is still Int32), but it also may coerce into a List(Int64) (the lambda parameter is Int64 now). So we need to call coerce_value_types before calling lambda_parameters, and then use it's output to call coerce_values_for_lambdas. 8f3b0fe
is there a test for that?
There was a problem hiding this comment.
@LiaCastaneda after this is resolved and no further comments I'll merge this PR
There was a problem hiding this comment.
expanded an existing test to cover this c93fc81
There was a problem hiding this comment.
Yeah, thats true. When adding it I thought about removing the flag, changing the return type to Result<Option<Vec>> with a blank impl Ok(None), so that it would always be called but do nothing with the default impl, but I didn't like much the signature, WDYT?
I understand. imo even if not the prettiest signature, Result<Option<Vec<DataType>>> removes an extra thing the implementor has to remember, the contract is in the return type itself. There are other traits/apis in DataFusion that also have a return of type Result<Option<...>>, so I guess it's fine to have it like that
|
I tried to build Good thing it works. From the performance point of view there are some improvements can be made for lambdas but likely we can address it in follow, @gstvg it is up to you |
|
@comphead That's great. I'm almost sure none of these improvements are breaking so we can address it on another PR. I added it to #21172 CI failure seems unrelated: https://github.com/apache/datafusion/actions/runs/25037217756/job/73331583409#step:7:919 |
|
@comphead Regarding the performance optimization, I'm adding some helpers in arrow-rs that will handle sliced and cleanup of nulls to make it super fast than we can use later |
|
@comphead looks like you have no other comments, and I approved, so I will wait for the last @LiaCastaneda comment to be resolved and I'll merge this |
|
Big thank you for @gstvg and everyone involved! |
|
Thanks everyone! This is a huge one! |
|
this is pretty amazing -- I put a note to include it in the 55 release's notes: |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. Followup on #21679 (comment) ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com>
|
Many thanks for everyone involved, both reviewers and also those who showed interest on the feature. Reviewing such a big PR is not easy and I'm very grateful for it, thanks again ❤️ |
|
This is amazing. Thank you so much for all the hard work on this. This is definitely one of the things I am most excited about in the next release. This is going to be huge! |
|
I just wanted to bring to attention that DuckDB deprecated this very sytnax because of conflicts with JSON operators. I guess we may want to support both long term (Spark uses the arrow syntax), but I think there's a real risk that we are not even evaluating incompatibility with JSON operators because they are not implemented by default in DataFusion (but we are in talks to do so #21301). Happy to open an issue for discussion but wanted to check first if this was discussed at all, as far as I can tell from going over the PR it has not. |
|
@adriangb I think this decision can be left to the user via the configurable dialect, as today. This PR merely consumes the LambdaFunction from sqlparser-rs AST, which syntax it parses is defined by the configured dialect. Is up to sqlparser to avoid conflicts (see apache/datafusion-sqlparser-rs#2224). The sqllogictests here requires setting the dialect to databricks, for example. I guess what we can do here is:
|
This a clean version of #18921 to make it easier to review
this is a breaking change due to adding variant to
Exprenum, new methods on traitsSession,FunctionRegistryandContextProviderand a new arg onTaskContext::newThis PR adds support for lambdas and the
array_transformfunction used to test the lambda implementation.Example usage:
Note: column capture has been removed for now and will be added on a follow on PR, see #21172
Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code
3 new
Exprvariants are added,HigherOrderFunction, owing a new traitHigherOrderUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
HigherOrderUDFand thearray_transformimplementation ofHigherOrderUDFrelevant methods, collapsed due to their sizeThe added
HigherOrderUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambda_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.HigherOrderUDF
array_transform lambda_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example
A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved during construction time in the sql planner,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters
How it would look like: